AWS Bedrock Knowledge Base

AWS Bedrock RAG (Retrieval-Augmented Generation) Guide

Table of Contents


Overview

RAG (Retrieval-Augmented Generation) is a technique that enhances Large Language Models (LLMs) by providing them with relevant external knowledge retrieved from a database or document store. Instead of relying solely on the model's training data, RAG allows models to access up-to-date, domain-specific, or private information.

AWS Bedrock provides two main approaches for implementing RAG: 1. Managed Knowledge Bases - Fully managed RAG solution 2. Custom RAG - Build your own using Bedrock APIs and vector databases

Key Benefits: - Access to current and private data - Reduced hallucinations - Source attribution - No model retraining required - Cost-effective knowledge updates

What is RAG?

The Problem RAG Solves

Traditional LLM Limitations:

User: "What's our company's Q4 revenue?"

Traditional LLM:
❌ "I don't have access to your company's financial data"
❌ "Based on my training data from 2023..." (outdated)
❌ Makes up numbers (hallucination)

Problem:
- LLMs only know what they were trained on
- Training data has a cutoff date
- No access to private/proprietary data
- Cannot access real-time information

RAG Solution:

User: "What's our company's Q4 revenue?"

RAG-Enhanced LLM:
1. Retrieve: Search company documents for "Q4 revenue"
2. Find: "Q4 2024 revenue was $50M, up 25% YoY"
3. Generate: "According to the Q4 financial report, 
             revenue was $50M, representing 25% growth."
✅ Accurate, current, sourced information

How RAG Works

RAG Process Flow:

┌─────────────────────────────────────────────────────────────┐
│                    USER QUERY                                │
│         "What is our return policy for electronics?"         │
└────────────────────────┬────────────────────────────────────┘
                         │
                         ▼
┌─────────────────────────────────────────────────────────────┐
│  STEP 1: QUERY PROCESSING                                    │
│  • Convert query to embedding vector                         │
│  • Optimize for semantic search                              │
└────────────────────────┬────────────────────────────────────┘
                         │
                         ▼
┌─────────────────────────────────────────────────────────────┐
│  STEP 2: RETRIEVAL                                           │
│  • Search vector database                                    │
│  • Find semantically similar documents                       │
│  • Rank by relevance score                                   │
│                                                              │
│  Retrieved Documents:                                        │
│  1. "Electronics can be returned within 30 days..." (0.95)  │
│  2. "Original packaging required for returns..." (0.87)     │
│  3. "Refunds processed within 5-7 business days..." (0.82)  │
└────────────────────────┬────────────────────────────────────┘
                         │
                         ▼
┌─────────────────────────────────────────────────────────────┐
│  STEP 3: AUGMENTATION                                        │
│  • Combine retrieved context with user query                 │
│  • Create enhanced prompt for LLM                            │
│                                                              │
│  Enhanced Prompt:                                            │
│  "Based on these company policies:                           │
│   [Retrieved Document 1]                                     │
│   [Retrieved Document 2]                                     │
│   [Retrieved Document 3]                                     │
│                                                              │
│   Answer: What is our return policy for electronics?"       │
└────────────────────────┬────────────────────────────────────┘
                         │
                         ▼
┌─────────────────────────────────────────────────────────────┐
│  STEP 4: GENERATION                                          │
│  • LLM processes enhanced prompt                             │
│  • Generates response using retrieved context                │
│  • Includes source citations                                 │
└────────────────────────┬────────────────────────────────────┘
                         │
                         ▼
┌─────────────────────────────────────────────────────────────┐
│                    RESPONSE                                  │
│  "Electronics can be returned within 30 days of purchase     │
│   with original packaging. Refunds are processed within      │
│   5-7 business days.                                         │
│                                                              │
│   Source: Return Policy Document, Section 3.2"              │
└─────────────────────────────────────────────────────────────┘

Detailed Workflow:

# Simplified RAG workflow
def rag_pipeline(user_query):
    # 1. Convert query to embedding
    query_embedding = embed_text(user_query)
    # Result: [0.23, -0.45, 0.67, ...] (1536-dim vector)

    # 2. Search vector database
    similar_docs = vector_db.search(
        query_embedding,
        top_k=5,
        min_score=0.7
    )
    # Result: [
    #   {"text": "...", "score": 0.95, "source": "policy.pdf"},
    #   {"text": "...", "score": 0.87, "source": "faq.pdf"}
    # ]

    # 3. Build augmented prompt
    context = "\n\n".join([doc["text"] for doc in similar_docs])
    augmented_prompt = f"""
    Use this information to answer the question:

    {context}

    Question: {user_query}

    Provide an accurate answer and cite your sources.
    """

    # 4. Generate response
    response = llm.generate(augmented_prompt)

    return response

RAG vs Other Approaches

Approach How It Works Pros Cons Use Case
Base LLM Use model as-is Simple, fast Limited knowledge, outdated General Q&A
Fine-tuning Retrain model on custom data Model learns domain Expensive, static Specialized domains
Prompt Engineering Add context in prompt Flexible Token limits Small context
RAG Retrieve + Generate Dynamic, scalable Complexity Large knowledge bases

Comparison Example:

Question: "What's the status of order #12345?"

┌─────────────────────────────────────────────────────────────┐
│ BASE LLM                                                     │
│ "I don't have access to order information"                  │
│ ❌ Cannot access external data                              │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│ FINE-TUNED MODEL                                            │
│ "Orders typically take 3-5 days to ship"                    │
│ ⚠️  Generic answer, not specific to order #12345           │
│ 💰 Expensive to retrain for every order update             │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│ PROMPT ENGINEERING                                          │
│ Include all orders in prompt: "Order 12345: shipped..."    │
│ ⚠️  Hits token limits with many orders                     │
│ 💰 Expensive to include all data in every request          │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│ RAG                                                         │
│ 1. Search order database for #12345                        │
│ 2. Retrieve: "Order #12345 shipped on Jan 15, tracking..." │
│ 3. Generate: "Your order #12345 was shipped on January 15  │
│    via FedEx. Tracking: 1Z999..."                          │
│ ✅ Accurate, current, specific                             │
│ ✅ Scales to millions of orders                            │
│ ✅ Updates automatically                                   │
└─────────────────────────────────────────────────────────────┘

Why Use RAG?

1. Access to Private/Current Data

Problem: LLMs don't know your company's data or recent events

RAG Solution:

# Access private company data
query = "What were the key decisions from yesterday's board meeting?"

# RAG retrieves from internal documents
retrieved = [
    "Board Meeting Minutes - Jan 15, 2025",
    "Decision: Approved $5M budget for AI initiative",
    "Decision: Expanded to European market in Q2"
]

# LLM generates answer with current, private information
response = "Based on yesterday's board meeting, two key decisions were made:
1. Approval of $5M budget for AI initiative
2. Plans to expand to European market in Q2 2025"

2. Reduced Hallucinations

Problem: LLMs sometimes make up information

Without RAG:

User: "What's the warranty on Model X-2000?"
LLM: "The Model X-2000 comes with a 2-year warranty"
❌ Made up answer (actual warranty is 3 years)

With RAG:

User: "What's the warranty on Model X-2000?"

Retrieved: "Model X-2000 Specifications: 3-year comprehensive warranty"

LLM: "The Model X-2000 includes a 3-year comprehensive warranty.
     Source: Product Specifications Document"
✅ Accurate answer based on retrieved facts

3. Cost-Effective Knowledge Updates

Fine-tuning Approach:

New product launched → Retrain entire model → $$$
Policy updated → Retrain entire model → $$$
Price changed → Retrain entire model → $$$

Cost: $10,000+ per update
Time: Days to weeks

RAG Approach:

New product launched → Add document to knowledge base → $
Policy updated → Update document → $
Price changed → Update document → $

Cost: Pennies per update
Time: Minutes

4. Source Attribution

RAG provides citations:

response = {
    "answer": "Our return policy allows 30-day returns for electronics",
    "sources": [
        {
            "document": "Return Policy v2.3",
            "page": 5,
            "section": "Electronics Returns",
            "confidence": 0.95
        }
    ]
}

Benefits: - ✅ Verify accuracy - ✅ Build trust - ✅ Audit trail - ✅ Compliance

5. Domain Expertise

RAG enables instant domain experts:

Medical RAG:
Knowledge Base: Medical journals, research papers, clinical guidelines
Result: AI assistant with medical knowledge

Legal RAG:
Knowledge Base: Case law, statutes, legal precedents
Result: AI assistant with legal knowledge

Financial RAG:
Knowledge Base: Financial reports, market data, regulations
Result: AI assistant with financial knowledge

RAG Architecture

Core Components

┌─────────────────────────────────────────────────────────────┐
│                    RAG SYSTEM ARCHITECTURE                   │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  ┌────────────────────────────────────────────────────┐    │
│  │  1. DOCUMENT INGESTION                             │    │
│  │     • Load documents (PDF, TXT, HTML, etc.)        │    │
│  │     • Parse and extract text                       │    │
│  │     • Clean and normalize                          │    │
│  └────────────────────────────────────────────────────┘    │
│                         │                                    │
│                         ▼                                    │
│  ┌────────────────────────────────────────────────────┐    │
│  │  2. CHUNKING                                       │    │
│  │     • Split documents into chunks                  │    │
│  │     • Typical size: 500-1000 tokens                │    │
│  │     • Maintain context overlap                     │    │
│  └────────────────────────────────────────────────────┘    │
│                         │                                    │
│                         ▼                                    │
│  ┌────────────────────────────────────────────────────┐    │
│  │  3. EMBEDDING GENERATION                           │    │
│  │     • Convert chunks to vectors                    │    │
│  │     • Use embedding model                          │    │
│  │     • Typical dimension: 1536                      │    │
│  └────────────────────────────────────────────────────┘    │
│                         │                                    │
│                         ▼                                    │
│  ┌────────────────────────────────────────────────────┐    │
│  │  4. VECTOR STORAGE                                 │    │
│  │     • Store embeddings in vector DB                │    │
│  │     • Index for fast similarity search             │    │
│  │     • Store metadata (source, page, etc.)          │    │
│  └────────────────────────────────────────────────────┘    │
│                                                              │
│  ┌────────────────────────────────────────────────────┐    │
│  │  5. QUERY PROCESSING                               │    │
│  │     • User asks question                           │    │
│  │     • Convert query to embedding                   │    │
│  │     • Search vector DB                             │    │
│  └────────────────────────────────────────────────────┘    │
│                         │                                    │
│                         ▼                                    │
│  ┌────────────────────────────────────────────────────┐    │
│  │  6. RETRIEVAL                                      │    │
│  │     • Find top-k similar chunks                    │    │
│  │     • Rank by relevance score                      │    │
│  │     • Apply filters (date, source, etc.)           │    │
│  └────────────────────────────────────────────────────┘    │
│                         │                                    │
│                         ▼                                    │
│  ┌────────────────────────────────────────────────────┐    │
│  │  7. AUGMENTATION                                   │    │
│  │     • Build prompt with context                    │    │
│  │     • Format retrieved chunks                      │    │
│  │     • Add instructions                             │    │
│  └────────────────────────────────────────────────────┘    │
│                         │                                    │
│                         ▼                                    │
│  ┌────────────────────────────────────────────────────┐    │
│  │  8. GENERATION                                     │    │
│  │     • Send to LLM                                  │    │
│  │     • Generate response                            │    │
│  │     • Include citations                            │    │
│  └────────────────────────────────────────────────────┘    │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Data Flow

Indexing Phase (One-time):

Documents → Chunking → Embeddings → Vector DB
   │            │           │            │
   │            │           │            └─ Store for retrieval
   │            │           └─ Convert to vectors
   │            └─ Split into pieces
   └─ Source data

Query Phase (Every request):

User Query → Embedding → Vector Search → Retrieved Docs
                │             │              │
                │             │              └─ Top-k relevant chunks
                │             └─ Find similar vectors
                └─ Convert to vector

Retrieved Docs → Augment Prompt → LLM → Response
      │               │            │        │
      │               │            │        └─ Final answer
      │               │            └─ Generate
      │               └─ Add context
      └─ Relevant information

Architecture Overview

Complete RAG System:

┌──────────────────────────────────────────────────────────────────┐
│                         DATA SOURCES                              │
│  • PDFs  • Word Docs  • Web Pages  • Databases  • APIs          │
└────────────────────────┬─────────────────────────────────────────┘
                         │
                         ▼
┌──────────────────────────────────────────────────────────────────┐
│                    DOCUMENT PROCESSING                            │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐          │
│  │   Extract    │→ │    Clean     │→ │    Chunk     │          │
│  │     Text     │  │   Normalize  │  │   Split      │          │
│  └──────────────┘  └──────────────┘  └──────────────┘          │
└────────────────────────┬─────────────────────────────────────────┘
                         │
                         ▼
┌──────────────────────────────────────────────────────────────────┐
│                    EMBEDDING SERVICE                              │
│              (AWS Bedrock Titan Embeddings)                       │
│                                                                   │
│  Text Chunk → [0.23, -0.45, 0.67, ..., 0.12]  (1536 dimensions) │
└────────────────────────┬─────────────────────────────────────────┘
                         │
                         ▼
┌──────────────────────────────────────────────────────────────────┐
│                    VECTOR DATABASE                                │
│  (OpenSearch, Pinecone, FAISS, Chroma, etc.)                     │
│                                                                   │
│  ┌────────────────────────────────────────────────────────┐     │
│  │  Chunk 1: [0.23, -0.45, ...] → "Return policy..."     │     │
│  │  Chunk 2: [0.12, 0.89, ...] → "Warranty info..."      │     │
│  │  Chunk 3: [-0.34, 0.56, ...] → "Shipping details..."  │     │
│  │  ...                                                    │     │
│  └────────────────────────────────────────────────────────┘     │
└──────────────────────────────────────────────────────────────────┘
                         │
                         │ (Query Time)
                         │
┌──────────────────────────────────────────────────────────────────┐
│                      USER QUERY                                   │
│              "What is the return policy?"                         │
└────────────────────────┬─────────────────────────────────────────┘
                         │
                         ▼
┌──────────────────────────────────────────────────────────────────┐
│                  QUERY EMBEDDING                                  │
│  "What is the return policy?" → [0.25, -0.43, 0.69, ...]        │
└────────────────────────┬─────────────────────────────────────────┘
                         │
                         ▼
┌──────────────────────────────────────────────────────────────────┐
│                  SIMILARITY SEARCH                                │
│  Find vectors closest to query vector                            │
│                                                                   │
│  Results:                                                         │
│  1. Chunk 1 (similarity: 0.95) → "Return policy..."             │
│  2. Chunk 5 (similarity: 0.87) → "Refund process..."            │
│  3. Chunk 9 (similarity: 0.82) → "Exchange policy..."           │
└────────────────────────┬─────────────────────────────────────────┘
                         │
                         ▼
┌──────────────────────────────────────────────────────────────────┐
│                  PROMPT AUGMENTATION                              │
│                                                                   │
│  Context: [Retrieved chunks]                                     │
│  Question: "What is the return policy?"                          │
│  Instructions: "Answer based on context, cite sources"           │
└────────────────────────┬─────────────────────────────────────────┘
                         │
                         ▼
┌──────────────────────────────────────────────────────────────────┐
│                    LLM (Claude, etc.)                             │
│              Generate answer with context                         │
└────────────────────────┬─────────────────────────────────────────┘
                         │
                         ▼
┌──────────────────────────────────────────────────────────────────┐
│                      RESPONSE                                     │
│  "Items can be returned within 30 days with receipt.             │
│   Refunds processed in 5-7 business days.                        │
│   Source: Return Policy Document, Section 3"                     │
└──────────────────────────────────────────────────────────────────┘

Bedrock RAG Options

AWS Bedrock offers multiple ways to implement RAG:

Option Complexity Control Use Case
Knowledge Bases Low Low Quick start, managed solution
Custom RAG High High Full customization needed
Hybrid Medium Medium Mix managed + custom

RAG with AWS Bedrock

Option 1: Bedrock Knowledge Bases (Managed)

Fully managed RAG solution - AWS handles everything:

┌─────────────────────────────────────────────────────────────┐
│           BEDROCK KNOWLEDGE BASES (MANAGED)                  │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  AWS Manages:                                               │
│  ✅ Document ingestion                                      │
│  ✅ Chunking strategy                                       │
│  ✅ Embedding generation                                    │
│  ✅ Vector storage (OpenSearch)                             │
│  ✅ Retrieval logic                                         │
│  ✅ Scaling and availability                                │
│                                                              │
│  You Provide:                                               │
│  📄 Documents (S3 bucket)                                   │
│  🔧 Configuration (chunk size, etc.)                        │
│  🤖 Model selection                                         │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Pros: - ✅ Quick setup (minutes) - ✅ No infrastructure management - ✅ Automatic scaling - ✅ Built-in best practices - ✅ Integrated with Agents

Cons: - ❌ Less customization - ❌ AWS OpenSearch only - ❌ Fixed chunking strategies

Best for: - Getting started quickly - Standard use cases - Teams without ML expertise - Integration with Bedrock Agents

Option 2: Custom RAG with Bedrock

Build your own RAG pipeline using Bedrock APIs:

┌─────────────────────────────────────────────────────────────┐
│              CUSTOM RAG WITH BEDROCK                         │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  You Control:                                               │
│  🔧 Document processing                                     │
│  🔧 Chunking strategy                                       │
│  🔧 Vector database choice                                  │
│  🔧 Retrieval algorithm                                     │
│  🔧 Prompt engineering                                      │
│                                                              │
│  Use Bedrock For:                                           │
│  🤖 Embeddings (Titan Embeddings)                           │
│  🤖 Generation (Claude, Titan, etc.)                        │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Pros: - ✅ Full control - ✅ Any vector database - ✅ Custom chunking - ✅ Advanced retrieval - ✅ Optimized for your use case

Cons: - ❌ More complex - ❌ Manage infrastructure - ❌ Requires ML knowledge - ❌ More code to maintain

Best for: - Advanced use cases - Specific requirements - Existing vector DB - Maximum optimization

Option 3: Hybrid Approach

Combine managed and custom components:

Example 1: Knowledge Bases + Custom Retrieval
- Use KB for storage
- Custom logic for retrieval

Example 2: Custom Embeddings + KB Storage
- Your embedding model
- KB for vector storage

Example 3: KB + Custom Post-Processing
- KB for retrieval
- Custom reranking logic

Implementation Approaches

Quick Comparison

# APPROACH 1: Bedrock Knowledge Bases (Managed)
# ============================================
# Setup time: 10 minutes
# Code: ~50 lines

# 1. Create Knowledge Base (Console or API)
kb = bedrock_agent.create_knowledge_base(...)

# 2. Add data source (S3 bucket)
data_source = bedrock_agent.create_data_source(...)

# 3. Sync data
bedrock_agent.start_ingestion_job(...)

# 4. Query
response = bedrock_agent_runtime.retrieve_and_generate(
    input={'text': 'What is the return policy?'},
    retrieveAndGenerateConfiguration={
        'type': 'KNOWLEDGE_BASE',
        'knowledgeBaseConfiguration': {
            'knowledgeBaseId': 'KB123',
            'modelArn': 'arn:aws:bedrock:...:claude-3-sonnet'
        }
    }
)

print(response['output']['text'])
# Done! ✅


# APPROACH 2: Custom RAG
# ============================================
# Setup time: 2-3 days
# Code: ~500 lines

# 1. Load documents
docs = load_documents('data/')

# 2. Chunk documents
chunks = chunk_documents(docs, chunk_size=1000)

# 3. Generate embeddings
embeddings = []
for chunk in chunks:
    emb = bedrock_runtime.invoke_model(
        modelId='amazon.titan-embed-text-v1',
        body=json.dumps({'inputText': chunk})
    )
    embeddings.append(emb)

# 4. Store in vector DB
vector_db.upsert(embeddings, chunks)

# 5. Query
query_emb = get_embedding(user_query)
results = vector_db.search(query_emb, top_k=5)

# 6. Build prompt
prompt = build_rag_prompt(user_query, results)

# 7. Generate
response = bedrock_runtime.invoke_model(
    modelId='anthropic.claude-3-sonnet',
    body=json.dumps({'messages': [{'role': 'user', 'content': prompt}]})
)

# Much more code, but full control ✅

Bedrock Knowledge Bases (Managed Approach)

What are Knowledge Bases?

Bedrock Knowledge Bases are fully managed RAG solutions that handle:

┌─────────────────────────────────────────────────────────────┐
│              KNOWLEDGE BASE COMPONENTS                       │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  1. DATA SOURCE                                             │
│     • S3 bucket with documents                              │
│     • Supported: PDF, TXT, MD, HTML, DOC, CSV              │
│     • Automatic monitoring for changes                      │
│                                                              │
│  2. EMBEDDING MODEL                                         │
│     • Titan Embeddings G1 - Text                            │
│     • Titan Embeddings V2                                   │
│     • Cohere Embed models                                   │
│                                                              │
│  3. VECTOR STORE                                            │
│     • Amazon OpenSearch Serverless                          │
│     • Amazon OpenSearch Service                             │
│     • Pinecone                                              │
│     • Redis Enterprise Cloud                                │
│                                                              │
│  4. CHUNKING STRATEGY                                       │
│     • Fixed-size chunking                                   │
│     • Default: 300 tokens                                   │
│     • Configurable overlap                                  │
│                                                              │
│  5. RETRIEVAL CONFIGURATION                                 │
│     • Number of results                                     │
│     • Metadata filtering                                    │
│     • Hybrid search options                                 │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Creating a Knowledge Base

Step-by-Step Guide:

import boto3
import json

bedrock_agent = boto3.client('bedrock-agent')

# Step 1: Create Knowledge Base
kb_response = bedrock_agent.create_knowledge_base(
    name='company-docs-kb',
    description='Company documentation and policies',
    roleArn='arn:aws:iam::ACCOUNT:role/BedrockKBRole',
    knowledgeBaseConfiguration={
        'type': 'VECTOR',
        'vectorKnowledgeBaseConfiguration': {
            'embeddingModelArn': 'arn:aws:bedrock:us-east-1::foundation-model/amazon.titan-embed-text-v1'
        }
    },
    storageConfiguration={
        'type': 'OPENSEARCH_SERVERLESS',
        'opensearchServerlessConfiguration': {
            'collectionArn': 'arn:aws:aoss:us-east-1:ACCOUNT:collection/kb-collection',
            'vectorIndexName': 'bedrock-knowledge-base-index',
            'fieldMapping': {
                'vectorField': 'bedrock-knowledge-base-default-vector',
                'textField': 'AMAZON_BEDROCK_TEXT_CHUNK',
                'metadataField': 'AMAZON_BEDROCK_METADATA'
            }
        }
    }
)

kb_id = kb_response['knowledgeBase']['knowledgeBaseId']
print(f"Knowledge Base created: {kb_id}")

# Step 2: Create Data Source (S3)
ds_response = bedrock_agent.create_data_source(
    knowledgeBaseId=kb_id,
    name='s3-docs-source',
    description='S3 bucket with company documents',
    dataSourceConfiguration={
        'type': 'S3',
        's3Configuration': {
            'bucketArn': 'arn:aws:s3:::my-company-docs',
            'inclusionPrefixes': ['policies/', 'procedures/']
        }
    },
    vectorIngestionConfiguration={
        'chunkingConfiguration': {
            'chunkingStrategy': 'FIXED_SIZE',
            'fixedSizeChunkingConfiguration': {
                'maxTokens': 300,
                'overlapPercentage': 20
            }
        }
    }
)

ds_id = ds_response['dataSource']['dataSourceId']
print(f"Data Source created: {ds_id}")

# Step 3: Start Ingestion Job
ingestion_response = bedrock_agent.start_ingestion_job(
    knowledgeBaseId=kb_id,
    dataSourceId=ds_id
)

job_id = ingestion_response['ingestionJob']['ingestionJobId']
print(f"Ingestion job started: {job_id}")

# Step 4: Wait for ingestion to complete
import time

while True:
    job_status = bedrock_agent.get_ingestion_job(
        knowledgeBaseId=kb_id,
        dataSourceId=ds_id,
        ingestionJobId=job_id
    )

    status = job_status['ingestionJob']['status']
    print(f"Ingestion status: {status}")

    if status in ['COMPLETE', 'FAILED']:
        break

    time.sleep(10)

print("Knowledge Base ready!")

IAM Role for Knowledge Base:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:ListBucket"
      ],
      "Resource": [
        "arn:aws:s3:::my-company-docs",
        "arn:aws:s3:::my-company-docs/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "bedrock:InvokeModel"
      ],
      "Resource": [
        "arn:aws:bedrock:*::foundation-model/amazon.titan-embed-text-v1"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "aoss:APIAccessAll"
      ],
      "Resource": [
        "arn:aws:aoss:us-east-1:ACCOUNT:collection/*"
      ]
    }
  ]
}

Querying Knowledge Bases

Two Query Methods:

Method 1: Retrieve Only (Get Documents)

bedrock_agent_runtime = boto3.client('bedrock-agent-runtime')

# Just retrieve relevant documents
retrieve_response = bedrock_agent_runtime.retrieve(
    knowledgeBaseId='KB123456',
    retrievalQuery={
        'text': 'What is the return policy for electronics?'
    },
    retrievalConfiguration={
        'vectorSearchConfiguration': {
            'numberOfResults': 5,
            'overrideSearchType': 'HYBRID'  # or 'SEMANTIC'
        }
    }
)

# Process results
for result in retrieve_response['retrievalResults']:
    print(f"Score: {result['score']}")
    print(f"Content: {result['content']['text']}")
    print(f"Source: {result['location']['s3Location']['uri']}")
    print(f"Metadata: {result['metadata']}")
    print("---")

Method 2: Retrieve and Generate (RAG)

# Retrieve + Generate answer
rag_response = bedrock_agent_runtime.retrieve_and_generate(
    input={
        'text': 'What is the return policy for electronics?'
    },
    retrieveAndGenerateConfiguration={
        'type': 'KNOWLEDGE_BASE',
        'knowledgeBaseConfiguration': {
            'knowledgeBaseId': 'KB123456',
            'modelArn': 'arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0',
            'retrievalConfiguration': {
                'vectorSearchConfiguration': {
                    'numberOfResults': 5
                }
            },
            'generationConfiguration': {
                'promptTemplate': {
                    'textPromptTemplate': '''
                    You are a helpful customer service assistant.

                    Use the following context to answer the question.
                    If you don't know the answer, say so.
                    Always cite your sources.

                    Context:
                    $search_results$

                    Question: $query$

                    Answer:
                    '''
                },
                'inferenceConfig': {
                    'textInferenceConfig': {
                        'temperature': 0.7,
                        'maxTokens': 500
                    }
                }
            }
        }
    }
)

# Get answer
answer = rag_response['output']['text']
print(f"Answer: {answer}")

# Get citations
for citation in rag_response['citations']:
    for reference in citation['retrievedReferences']:
        print(f"Source: {reference['location']['s3Location']['uri']}")
        print(f"Content: {reference['content']['text']}")

With Metadata Filtering:

# Filter by metadata
response = bedrock_agent_runtime.retrieve_and_generate(
    input={'text': 'What are the Q4 results?'},
    retrieveAndGenerateConfiguration={
        'type': 'KNOWLEDGE_BASE',
        'knowledgeBaseConfiguration': {
            'knowledgeBaseId': 'KB123456',
            'modelArn': 'arn:aws:bedrock:...:claude-3-sonnet',
            'retrievalConfiguration': {
                'vectorSearchConfiguration': {
                    'numberOfResults': 5,
                    'filter': {
                        'equals': {
                            'key': 'year',
                            'value': '2024'
                        }
                    }
                }
            }
        }
    }
)

Integration with Agents

Knowledge Bases work seamlessly with Bedrock Agents:

# Create agent with Knowledge Base
agent_response = bedrock_agent.create_agent(
    agentName='customer-support-agent',
    foundationModel='anthropic.claude-3-sonnet-20240229-v1:0',
    instruction='''
    You are a customer support agent.
    Use the knowledge base to answer questions about:
    - Return policies
    - Product information
    - Shipping details
    - Warranty information

    Always be helpful and cite your sources.
    ''',
    agentResourceRoleArn='arn:aws:iam::ACCOUNT:role/AgentRole'
)

agent_id = agent_response['agent']['agentId']

# Associate Knowledge Base with Agent
bedrock_agent.associate_agent_knowledge_base(
    agentId=agent_id,
    agentVersion='DRAFT',
    knowledgeBaseId='KB123456',
    description='Company policies and product documentation',
    knowledgeBaseState='ENABLED'
)

# Prepare and create alias
bedrock_agent.prepare_agent(agentId=agent_id)

alias_response = bedrock_agent.create_agent_alias(
    agentId=agent_id,
    agentAliasName='production'
)

alias_id = alias_response['agentAlias']['agentAliasId']

# Now agent can use KB automatically
agent_response = bedrock_agent_runtime.invoke_agent(
    agentId=agent_id,
    agentAliasId=alias_id,
    sessionId='session-123',
    inputText='What is the warranty on Model X-2000?'
)

# Agent will:
# 1. Recognize it needs product information
# 2. Query the Knowledge Base
# 3. Use retrieved context to answer
# 4. Cite sources

Custom RAG Implementation

Step 1: Document Processing

Load and prepare documents:

import boto3
import json
from pathlib import Path

def load_documents(directory):
    """
    Load documents from directory
    """
    documents = []

    for file_path in Path(directory).rglob('*'):
        if file_path.suffix in ['.txt', '.md', '.pdf']:
            # Extract text based on file type
            if file_path.suffix == '.pdf':
                text = extract_pdf_text(file_path)
            else:
                text = file_path.read_text(encoding='utf-8')

            documents.append({
                'text': text,
                'source': str(file_path),
                'metadata': {
                    'filename': file_path.name,
                    'type': file_path.suffix,
                    'size': file_path.stat().st_size
                }
            })

    return documents

def extract_pdf_text(pdf_path):
    """
    Extract text from PDF
    """
    import PyPDF2

    text = ""
    with open(pdf_path, 'rb') as file:
        pdf_reader = PyPDF2.PdfReader(file)
        for page in pdf_reader.pages:
            text += page.extract_text()

    return text

def clean_text(text):
    """
    Clean and normalize text
    """
    # Remove extra whitespace
    text = ' '.join(text.split())

    # Remove special characters if needed
    # text = re.sub(r'[^\w\s.,!?-]', '', text)

    return text

# Load documents
docs = load_documents('data/company-docs/')
print(f"Loaded {len(docs)} documents")

Step 2: Generate Embeddings

Use Bedrock Titan Embeddings:

def get_embedding(text, model_id='amazon.titan-embed-text-v1'):
    """
    Generate embedding for text using Bedrock
    """
    bedrock_runtime = boto3.client('bedrock-runtime')

    # Prepare request
    body = json.dumps({
        'inputText': text
    })

    # Call Bedrock
    response = bedrock_runtime.invoke_model(
        modelId=model_id,
        body=body,
        contentType='application/json',
        accept='application/json'
    )

    # Parse response
    response_body = json.loads(response['body'].read())
    embedding = response_body['embedding']

    return embedding

def get_embeddings_batch(texts, batch_size=25):
    """
    Generate embeddings for multiple texts
    """
    embeddings = []

    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]

        for text in batch:
            emb = get_embedding(text)
            embeddings.append(emb)

        print(f"Processed {min(i + batch_size, len(texts))}/{len(texts)} texts")

    return embeddings

# Example usage
text = "Our return policy allows 30-day returns for all electronics."
embedding = get_embedding(text)
print(f"Embedding dimension: {len(embedding)}")  # 1536 for Titan v1
print(f"First 5 values: {embedding[:5]}")

Using Titan Embeddings V2:

def get_embedding_v2(text, dimensions=1024):
    """
    Use Titan Embeddings V2 with configurable dimensions
    """
    bedrock_runtime = boto3.client('bedrock-runtime')

    body = json.dumps({
        'inputText': text,
        'dimensions': dimensions,  # 256, 512, or 1024
        'normalize': True
    })

    response = bedrock_runtime.invoke_model(
        modelId='amazon.titan-embed-text-v2:0',
        body=body,
        contentType='application/json',
        accept='application/json'
    )

    response_body = json.loads(response['body'].read())
    return response_body['embedding']

Step 3: Store in Vector Database

Option A: FAISS (Local/Simple)

import faiss
import numpy as np
import pickle

class FAISSVectorStore:
    def __init__(self, dimension=1536):
        self.dimension = dimension
        self.index = faiss.IndexFlatL2(dimension)
        self.documents = []
        self.metadata = []

    def add(self, embeddings, documents, metadata=None):
        """
        Add embeddings to index
        """
        embeddings_array = np.array(embeddings).astype('float32')
        self.index.add(embeddings_array)
        self.documents.extend(documents)

        if metadata:
            self.metadata.extend(metadata)

    def search(self, query_embedding, top_k=5):
        """
        Search for similar vectors
        """
        query_array = np.array([query_embedding]).astype('float32')
        distances, indices = self.index.search(query_array, top_k)

        results = []
        for i, idx in enumerate(indices[0]):
            if idx < len(self.documents):
                results.append({
                    'document': self.documents[idx],
                    'metadata': self.metadata[idx] if self.metadata else {},
                    'distance': float(distances[0][i]),
                    'score': 1 / (1 + float(distances[0][i]))  # Convert to similarity
                })

        return results

    def save(self, path):
        """
        Save index to disk
        """
        faiss.write_index(self.index, f"{path}/index.faiss")
        with open(f"{path}/documents.pkl", 'wb') as f:
            pickle.dump({'documents': self.documents, 'metadata': self.metadata}, f)

    def load(self, path):
        """
        Load index from disk
        """
        self.index = faiss.read_index(f"{path}/index.faiss")
        with open(f"{path}/documents.pkl", 'rb') as f:
            data = pickle.load(f)
            self.documents = data['documents']
            self.metadata = data['metadata']

# Usage
vector_store = FAISSVectorStore(dimension=1536)

# Add documents
chunks = ["chunk 1 text", "chunk 2 text", "chunk 3 text"]
embeddings = get_embeddings_batch(chunks)
metadata = [{'source': 'doc1.pdf'}, {'source': 'doc1.pdf'}, {'source': 'doc2.pdf'}]

vector_store.add(embeddings, chunks, metadata)

# Search
query = "What is the return policy?"
query_emb = get_embedding(query)
results = vector_store.search(query_emb, top_k=3)

for result in results:
    print(f"Score: {result['score']:.3f}")
    print(f"Text: {result['document']}")
    print(f"Source: {result['metadata']['source']}")
    print("---")

# Save for later use
vector_store.save('vector_store')

Option B: Pinecone (Cloud)

import pinecone

# Initialize Pinecone
pinecone.init(
    api_key='YOUR_API_KEY',
    environment='us-west1-gcp'
)

# Create index
index_name = 'company-docs'
if index_name not in pinecone.list_indexes():
    pinecone.create_index(
        name=index_name,
        dimension=1536,
        metric='cosine'
    )

index = pinecone.Index(index_name)

# Add vectors
def add_to_pinecone(chunks, embeddings, metadata):
    """
    Add vectors to Pinecone
    """
    vectors = []
    for i, (chunk, emb, meta) in enumerate(zip(chunks, embeddings, metadata)):
        vectors.append({
            'id': f'doc_{i}',
            'values': emb,
            'metadata': {
                'text': chunk,
                **meta
            }
        })

    # Upsert in batches
    batch_size = 100
    for i in range(0, len(vectors), batch_size):
        batch = vectors[i:i + batch_size]
        index.upsert(vectors=batch)

# Query
def query_pinecone(query_text, top_k=5):
    """
    Query Pinecone index
    """
    query_emb = get_embedding(query_text)

    results = index.query(
        vector=query_emb,
        top_k=top_k,
        include_metadata=True
    )

    return results['matches']

# Usage
add_to_pinecone(chunks, embeddings, metadata)
results = query_pinecone("What is the return policy?", top_k=3)

for match in results:
    print(f"Score: {match['score']:.3f}")
    print(f"Text: {match['metadata']['text']}")
    print("---")

Option C: OpenSearch

from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
import boto3

# AWS credentials
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(
    credentials.access_key,
    credentials.secret_key,
    'us-east-1',
    'es',
    session_token=credentials.token
)

# Connect to OpenSearch
client = OpenSearch(
    hosts=[{'host': 'your-opensearch-endpoint.us-east-1.es.amazonaws.com', 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection
)

# Create index with vector field
index_name = 'company-docs'
index_body = {
    'settings': {
        'index': {
            'knn': True,
            'knn.algo_param.ef_search': 100
        }
    },
    'mappings': {
        'properties': {
            'text': {'type': 'text'},
            'embedding': {
                'type': 'knn_vector',
                'dimension': 1536,
                'method': {
                    'name': 'hnsw',
                    'space_type': 'cosinesimil',
                    'engine': 'nmslib'
                }
            },
            'metadata': {'type': 'object'}
        }
    }
}

if not client.indices.exists(index=index_name):
    client.indices.create(index=index_name, body=index_body)

# Add documents
def add_to_opensearch(chunks, embeddings, metadata):
    """
    Add documents to OpenSearch
    """
    for i, (chunk, emb, meta) in enumerate(zip(chunks, embeddings, metadata)):
        doc = {
            'text': chunk,
            'embedding': emb,
            'metadata': meta
        }
        client.index(index=index_name, id=str(i), body=doc)

# Query
def query_opensearch(query_text, top_k=5):
    """
    Query OpenSearch with vector similarity
    """
    query_emb = get_embedding(query_text)

    query_body = {
        'size': top_k,
        'query': {
            'knn': {
                'embedding': {
                    'vector': query_emb,
                    'k': top_k
                }
            }
        }
    }

    response = client.search(index=index_name, body=query_body)

    results = []
    for hit in response['hits']['hits']:
        results.append({
            'text': hit['_source']['text'],
            'metadata': hit['_source']['metadata'],
            'score': hit['_score']
        })

    return results

# Usage
add_to_opensearch(chunks, embeddings, metadata)
results = query_opensearch("What is the return policy?", top_k=3)

Step 4: Retrieval

Implement retrieval with reranking:

def retrieve_with_reranking(query, top_k=5, rerank_top_k=3):
    """
    Retrieve documents with optional reranking
    """
    # Step 1: Initial retrieval (get more than needed)
    query_emb = get_embedding(query)
    initial_results = vector_store.search(query_emb, top_k=top_k * 2)

    # Step 2: Rerank using cross-encoder or LLM
    reranked = rerank_results(query, initial_results)

    # Step 3: Return top results
    return reranked[:rerank_top_k]

def rerank_results(query, results):
    """
    Rerank results using LLM
    """
    # Simple reranking: ask LLM to score relevance
    bedrock_runtime = boto3.client('bedrock-runtime')

    scored_results = []
    for result in results:
        prompt = f"""
        Query: {query}
        Document: {result['document']}

        On a scale of 0-10, how relevant is this document to the query?
        Respond with only a number.
        """

        response = bedrock_runtime.invoke_model(
            modelId='anthropic.claude-3-haiku-20240307-v1:0',
            body=json.dumps({
                'anthropic_version': 'bedrock-2023-05-31',
                'messages': [{'role': 'user', 'content': prompt}],
                'max_tokens': 10,
                'temperature': 0
            })
        )

        response_body = json.loads(response['body'].read())
        score = float(response_body['content'][0]['text'].strip())

        result['rerank_score'] = score
        scored_results.append(result)

    # Sort by rerank score
    scored_results.sort(key=lambda x: x['rerank_score'], reverse=True)

    return scored_results

Hybrid Search (Vector + Keyword):

def hybrid_search(query, top_k=5, alpha=0.5):
    """
    Combine vector search with keyword search
    alpha: weight for vector search (1-alpha for keyword)
    """
    # Vector search
    query_emb = get_embedding(query)
    vector_results = vector_store.search(query_emb, top_k=top_k * 2)

    # Keyword search (simple BM25)
    keyword_results = keyword_search(query, top_k=top_k * 2)

    # Combine scores
    combined = {}

    for result in vector_results:
        doc_id = result['metadata'].get('id')
        combined[doc_id] = {
            'document': result['document'],
            'metadata': result['metadata'],
            'score': alpha * result['score']
        }

    for result in keyword_results:
        doc_id = result['metadata'].get('id')
        if doc_id in combined:
            combined[doc_id]['score'] += (1 - alpha) * result['score']
        else:
            combined[doc_id] = {
                'document': result['document'],
                'metadata': result['metadata'],
                'score': (1 - alpha) * result['score']
            }

    # Sort by combined score
    results = sorted(combined.values(), key=lambda x: x['score'], reverse=True)

    return results[:top_k]

Step 5: Generation

Build RAG prompt and generate:

def build_rag_prompt(query, retrieved_docs):
    """
    Build prompt with retrieved context
    """
    context = "\n\n".join([
        f"Document {i+1} (Source: {doc['metadata'].get('source', 'Unknown')}):\n{doc['document']}"
        for i, doc in enumerate(retrieved_docs)
    ])

    prompt = f"""
    You are a helpful assistant. Use the following context to answer the question.
    If the answer is not in the context, say "I don't have enough information to answer that."
    Always cite which document(s) you used.

    Context:
    {context}

    Question: {query}

    Answer:
    """

    return prompt

def generate_rag_response(query, retrieved_docs):
    """
    Generate response using RAG
    """
    bedrock_runtime = boto3.client('bedrock-runtime')

    # Build prompt
    prompt = build_rag_prompt(query, retrieved_docs)

    # Call Claude
    response = bedrock_runtime.invoke_model(
        modelId='anthropic.claude-3-sonnet-20240229-v1:0',
        body=json.dumps({
            'anthropic_version': 'bedrock-2023-05-31',
            'messages': [
                {
                    'role': 'user',
                    'content': prompt
                }
            ],
            'max_tokens': 1000,
            'temperature': 0.7
        })
    )

    response_body = json.loads(response['body'].read())
    answer = response_body['content'][0]['text']

    return answer

# Complete RAG pipeline
def rag_query(query, top_k=3):
    """
    Complete RAG query pipeline
    """
    # 1. Retrieve
    retrieved_docs = retrieve_with_reranking(query, top_k=top_k)

    # 2. Generate
    answer = generate_rag_response(query, retrieved_docs)

    # 3. Return with sources
    return {
        'answer': answer,
        'sources': [
            {
                'text': doc['document'],
                'source': doc['metadata'].get('source'),
                'score': doc['score']
            }
            for doc in retrieved_docs
        ]
    }

# Usage
result = rag_query("What is the return policy for electronics?")
print(f"Answer: {result['answer']}\n")
print("Sources:")
for source in result['sources']:
    print(f"- {source['source']} (score: {source['score']:.3f})")

Streaming Response:

def generate_rag_response_streaming(query, retrieved_docs):
    """
    Generate streaming response
    """
    bedrock_runtime = boto3.client('bedrock-runtime')

    prompt = build_rag_prompt(query, retrieved_docs)

    response = bedrock_runtime.invoke_model_with_response_stream(
        modelId='anthropic.claude-3-sonnet-20240229-v1:0',
        body=json.dumps({
            'anthropic_version': 'bedrock-2023-05-31',
            'messages': [{'role': 'user', 'content': prompt}],
            'max_tokens': 1000,
            'temperature': 0.7
        })
    )

    # Stream response
    for event in response['body']:
        chunk = json.loads(event['chunk']['bytes'])

        if chunk['type'] == 'content_block_delta':
            if 'delta' in chunk and 'text' in chunk['delta']:
                yield chunk['delta']['text']

# Usage
print("Answer: ", end='', flush=True)
for chunk in generate_rag_response_streaming(query, retrieved_docs):
    print(chunk, end='', flush=True)
print()

Vector Databases for RAG

Comparison of Vector Database Options:

Database Type Pros Cons Best For
FAISS Local Fast, free, simple No persistence, single machine Development, small scale
Pinecone Cloud Managed, scalable Cost, vendor lock-in Production, easy setup
OpenSearch Self-hosted/Cloud Full-text + vector, AWS native Complex setup AWS environments
Chroma Local/Cloud Simple API, open source Newer, less mature Development, prototyping
Weaviate Self-hosted/Cloud Feature-rich, GraphQL Complex Advanced use cases
Qdrant Self-hosted/Cloud Fast, Rust-based Smaller community Performance-critical

Choosing a Vector Database:

Decision Tree:

Are you just prototyping?
├─ Yes → Use FAISS (simple, local)
└─ No → Continue

Do you want fully managed?
├─ Yes → Use Pinecone or AWS OpenSearch Serverless
└─ No → Continue

Already using AWS?
├─ Yes → Use OpenSearch (integrates well)
└─ No → Continue

Need hybrid search (vector + keyword)?
├─ Yes → Use OpenSearch or Weaviate
└─ No → Use Qdrant or Chroma

Need maximum performance?
└─ Use Qdrant or FAISS with custom infrastructure

Embeddings Models

Available Bedrock Embedding Models:

# Titan Embeddings G1 - Text
model_id = 'amazon.titan-embed-text-v1'
# - Dimension: 1536 (fixed)
# - Max input: 8192 tokens
# - Cost: $0.0001 per 1K tokens
# - Best for: General purpose

# Titan Embeddings V2
model_id = 'amazon.titan-embed-text-v2:0'
# - Dimensions: 256, 512, or 1024 (configurable)
# - Max input: 8192 tokens
# - Normalization option
# - Better performance than V1

# Cohere Embed English
model_id = 'cohere.embed-english-v3'
# - Dimension: 1024
# - Max input: 512 tokens
# - Optimized for English

# Cohere Embed Multilingual
model_id = 'cohere.embed-multilingual-v3'
# - Dimension: 1024
# - Max input: 512 tokens
# - Supports 100+ languages

Choosing an Embedding Model:

def choose_embedding_model(use_case):
    """
    Recommendation based on use case
    """
    recommendations = {
        'general': 'amazon.titan-embed-text-v2:0',
        'multilingual': 'cohere.embed-multilingual-v3',
        'english_only': 'cohere.embed-english-v3',
        'cost_sensitive': 'amazon.titan-embed-text-v2:0',  # Smaller dimensions
        'high_accuracy': 'amazon.titan-embed-text-v1'  # Larger dimensions
    }

    return recommendations.get(use_case, 'amazon.titan-embed-text-v2:0')

Best Practices

1. Chunking Strategy

Optimal chunk size depends on your use case:

Small chunks (200-300 tokens) - Pros: Precise retrieval, less noise - Cons: May lose context, need more chunks - Best for: Q&A, fact lookup

Medium chunks (500-1000 tokens) - Pros: Good balance, maintains context - Cons: None for most cases - Best for: Most use cases, general documents

Large chunks (1500-2000 tokens) - Pros: More context, fewer chunks - Cons: Less precise, more noise - Best for: Long-form content, summaries

# Smart chunking with overlap
def smart_chunk(text, chunk_size=1000, overlap=200):
    """
    Chunk text with overlap to maintain context
    """
    words = text.split()
    chunks = []

    for i in range(0, len(words), chunk_size - overlap):
        chunk_text = ' '.join(words[i:i + chunk_size])
        chunks.append(chunk_text)

    return chunks

# Semantic chunking (better)
def semantic_chunking(text):
    """
    Chunk by semantic boundaries (paragraphs, sections)
    """
    # Split by double newlines (paragraphs)
    paragraphs = text.split('\n\n')

    chunks = []
    current_chunk = ""

    for para in paragraphs:
        if len(current_chunk) + len(para) < 1000:
            current_chunk += para + "\n\n"
        else:
            if current_chunk:
                chunks.append(current_chunk.strip())
            current_chunk = para

    if current_chunk:
        chunks.append(current_chunk.strip())

    return chunks

2. Metadata Management

Store rich metadata for better filtering:

def create_document_with_metadata(text, source_file):
    """
    Create document with comprehensive metadata
    """
    return {
        'text': text,
        'metadata': {
            'source': source_file,
            'filename': Path(source_file).name,
            'type': Path(source_file).suffix,
            'created_at': datetime.now().isoformat(),
            'word_count': len(text.split()),
            'char_count': len(text),
            # Domain-specific metadata
            'version': '1',
            'department': 'HR',
            'category': 'policy',
            'last_updated': '2024-01-15',
            'author': 'HR Team',
            'tags': ['return', 'policy', 'electronics']
        }
    }

# Query with metadata filters
def query_with_filters(query, filters):
    """
    Query with metadata filtering
    """
    results = vector_store.search(query, top_k=20)

    # Apply filters
    filtered = []
    for result in results:
        meta = result['metadata']

        if filters.get('department') and meta.get('department') != filters['department']:
            continue
        if filters.get('category') and meta.get('category') != filters['category']:
            continue
        if filters.get('min_date') and meta.get('last_updated') < filters.get('min_date'):
            continue

        filtered.append(result)

    return filtered[:5]

# Usage
results = query_with_filters(
    "What is the return policy?",
    filters={
        'department': 'HR',
        'category': 'policy',
        'min_date': '2024-01-01'
    }
)

3. Query Optimization

Enhance user queries for better retrieval:

def optimize_query(user_query):
    """
    Enhance user query for better retrieval
    """
    bedrock_runtime = boto3.client('bedrock-runtime')

    prompt = f"""Rewrite this user query to be more specific and effective for document search.
    Add relevant keywords and context.

    Original query: {user_query}

    Optimized query:"""

    response = bedrock_runtime.invoke_model(
        modelId='anthropic.claude-3-haiku-20240307-v1:0',
        body=json.dumps({
            'anthropic_version': 'bedrock-2023-05-31',
            'messages': [{
                'role': 'user',
                'content': prompt
            }],
            'max_tokens': 200,
            'temperature': 0
        })
    )

    response_body = json.loads(response['body'].read())
    optimized = response_body['content'][0]['text'].strip()

    return optimized

# Query expansion
def expand_query(query):
    """
    Generate multiple query variations
    """
    variations = [
        query,
        f"What is {query}",
        f"Details about {query}?",
        f"Information on {query}",
    ]

    # Get embeddings for all variations
    embeddings = [get_embedding(q) for q in variations]

    # Average embeddings
    avg_embedding = np.mean(embeddings, axis=0).tolist()

    return avg_embedding

4. Response Quality

Ensure high-quality responses with validation:

def generate_with_quality_checks(query, retrieved_docs):
    """
    Generate response with quality validation
    """
    # Check if retrieved docs are relevant
    if not retrieved_docs or retrieved_docs[0]['score'] < 0.7:
        return {
            'answer': "I don't have enough relevant information to answer that question confidently.",
            'confidence': 'low',
            'sources': []
        }

    # Generate response
    answer = generate_rag_response(query, retrieved_docs)

    # Validate response
    if "I don't know" in answer or "not sure" in answer or "cannot" in answer:
        confidence = 'low'
    elif len(retrieved_docs) >= 3 and retrieved_docs[0]['score'] > 0.8:
        confidence = 'high'
    else:
        confidence = 'medium'

    return {
        'answer': answer,
        'confidence': confidence,
        'sources': [doc['metadata']['source'] for doc in retrieved_docs]
    }

# Add inline citations
def add_citations(answer, sources):
    """
    Add inline citations to answer
    """
    # Simple citation format
    cited_answer = answer

    for i, source in enumerate(sources, 1):
        source_name = source['metadata'].get('filename', 'Unknown')
        cited_answer += f"\n\n[{i}] {source_name}"

    return cited_answer

5. Monitoring and Evaluation

Track RAG system performance:

class RAGMetrics:
    def __init__(self):
        self.queries = []

    def log_query(self, query, results, response_time):
        """
        Log query metrics
        """
        self.queries.append({
            'timestamp': datetime.now().isoformat(),
            'query': query,
            'num_results': len(results),
            'top_score': results[0]['score'] if results else 0,
            'response_time': response_time
        })

    def get_stats(self):
        """
        Get performance statistics
        """
        if not self.queries:
            return {}

        return {
            'total_queries': len(self.queries),
            'avg_response_time': np.mean([q['response_time'] for q in self.queries]),
            'avg_top_score': np.mean([q['top_score'] for q in self.queries]),
            'low_confidence_queries': len([q for q in self.queries if q['top_score'] < 0.7])
        }

# Usage
metrics = RAGMetrics()

def rag_query_with_metrics(query):
    """
    RAG query with performance tracking
    """
    start_time = time.time()

    # Retrieve
    results = retrieve_with_reranking(query)

    # Generate
    answer = generate_rag_response(query, results)

    response_time = time.time() - start_time

    # Log metrics
    metrics.log_query(query, results, response_time)

    return answer

# Periodic evaluation
def evaluate_rag_quality(test_queries):
    """
    Evaluate RAG system quality
    """
    results = []

    for query, expected_answer in test_queries:
        answer = rag_query(query)

        # Compare (simplified)
        similarity = calculate_similarity(answer['answer'], expected_answer)

        results.append({
            'query': query,
            'similarity': similarity,
            'sources_used': len(answer['sources'])
        })

    avg_similarity = np.mean([r['similarity'] for r in results])
    pass_rate = len([r for r in results if r['similarity'] > 0.8]) / len(results)

    return {
        'avg_similarity': avg_similarity,
        'pass_rate': pass_rate
    }

Optimization Techniques

1. Caching

Cache embeddings and results:

from functools import lru_cache
import hashlib

class EmbeddingCache:
    def __init__(self):
        self.cache = {}

    def get_cache_key(self, text):
        """
        Generate cache key from text
        """
        return hashlib.md5(text.encode()).hexdigest()

    def get_embedding(self, text):
        """
        Get embedding with caching
        """
        cache_key = self.get_cache_key(text)

        if cache_key in self.cache:
            return self.cache[cache_key]

        # Generate embedding
        embedding = generate_embedding(text)

        # Cache it
        self.cache[cache_key] = embedding

        return embedding

# Query result caching
@lru_cache(maxsize=1000)
def cached_rag_query(query):
    """
    Cache RAG query results
    """
    return rag_query(query)

2. Batch Processing

Process multiple queries efficiently:

def batch_rag_queries(queries, batch_size=10):
    """
    Process multiple queries in batches
    """
    results = []

    for i in range(0, len(queries), batch_size):
        batch = queries[i:i + batch_size]

        # Get embeddings in batch
        embeddings = get_embeddings_batch(batch)

        # Retrieve for each
        batch_results = []
        for query, emb in zip(batch, embeddings):
            retrieved = vector_store.search(emb, top_k=3)
            answer = generate_rag_response(query, retrieved)
            batch_results.append(answer)

        results.extend(batch_results)

    return results

3. Async Processing

Handle concurrent requests:

import asyncio
import aioboto3

async def async_get_embedding(text, bedrock):
    """
    Async embedding generation
    """
    async with bedrock.client('bedrock-runtime') as client:
        response = await client.invoke_model(
            modelId='amazon.titan-embed-text-v1',
            body=json.dumps({
                'inputText': text
            })
        )

        body = await response['body'].read()
        return json.loads(body)['embedding']

async def async_rag_query(query):
    """
    Async RAG query
    """
    session = aioboto3.Session()

    # Get query embedding
    query_emb = await async_get_embedding(query)

    # Search (assuming async vector store)
    results = await vector_store.async_search(query_emb)

    # Generate
    answer = await async_generate_response(query, results)

    return answer

async def process_queries_concurrent(queries):
    """
    Process multiple queries concurrently
    """
    tasks = [async_rag_query(q) for q in queries]
    results = await asyncio.gather(*tasks)
    return results

# Usage
queries = ["query 1", "query 2", "query 3"]
results = asyncio.run(process_queries_concurrent(queries))

4. Vector Index Optimization

Optimize FAISS index for speed:

import faiss

def create_optimized_index(dimension=1536, num_clusters=100):
    """
    Create optimized FAISS index with IVF (Inverted File Index)
    """
    # Quantizer
    quantizer = faiss.IndexFlatL2(dimension)

    # IVF index
    index = faiss.IndexIVFFlat(quantizer, dimension, num_clusters)

    return index

def train_and_add(index, embeddings):
    """
    Train index and add vectors
    """
    embeddings_array = np.array(embeddings).astype('float32')

    # Train
    index.train(embeddings_array)

    # Add vectors
    index.add(embeddings_array)

    return index

def optimized_search(index, query_embedding, top_k=5, nprobe=10):
    """
    Search with optimized parameters
    """
    # Set nprobe (number of clusters to search)
    index.nprobe = nprobe

    query_array = np.array([query_embedding]).astype('float32')
    distances, indices = index.search(query_array, top_k)

    return indices, distances

Complete Examples

Example 1: Simple RAG System

"""
Complete simple RAG system
"""
import boto3
import json
import faiss
import numpy as np
from pathlib import Path

class SimpleRAG:
    def __init__(self):
        self.bedrock = boto3.client('bedrock-runtime')
        self.documents = []
        self.vector_store = None

    def load_documents(self, directory):
        """
        Load documents from directory
        """
        for file_path in Path(directory).glob('*.txt'):
            text = file_path.read_text()
            self.documents.append({
                'text': text,
                'source': str(file_path)
            })

        print(f"Loaded {len(self.documents)} documents")

    def chunk_documents(self, chunk_size=1000):
        """
        Chunk documents
        """
        chunks = []

        for doc in self.documents:
            words = doc['text'].split()

            for i in range(0, len(words), chunk_size):
                chunk_text = ' '.join(words[i:i + chunk_size])
                chunks.append({
                    'text': chunk_text,
                    'source': doc['source']
                })

        self.documents = chunks
        print(f"Created {len(chunks)} chunks")

    def create_embeddings(self):
        """
        Generate embeddings for all chunks
        """
        embeddings = []

        for i, doc in enumerate(self.documents):
            if (i + 1) % 10 == 0:
                print(f"Processed {i + 1}/{len(self.documents)}")

            response = self.bedrock.invoke_model(
                modelId='amazon.titan-embed-text-v1',
                body=json.dumps({
                    'inputText': doc['text']
                })
            )

            emb = json.loads(response['body'].read())['embedding']
            embeddings.append(emb)

        # Create FAISS index
        embeddings_array = np.array(embeddings).astype('float32')
        self.vector_store = faiss.IndexFlatL2(len(embeddings[0]))
        self.vector_store.add(embeddings_array)

        print("Vector store created")

    def retrieve(self, query, top_k=3):
        """
        Retrieve relevant documents
        """
        # Get query embedding
        response = self.bedrock.invoke_model(
            modelId='amazon.titan-embed-text-v1',
            body=json.dumps({'inputText': query})
        )

        query_emb = json.loads(response['body'].read())['embedding']

        # Search
        query_array = np.array([query_emb]).astype('float32')
        distances, indices = self.vector_store.search(query_array, top_k)

        # Get documents
        results = []
        for idx in indices[0]:
            results.append(self.documents[idx])

        return results

    def generate(self, query, context_docs):
        """
        Generate answer with context
        """
        # Build prompt
        context = "\n\n".join([
            f"Document {i+1}:\n{doc['text']}"
            for i, doc in enumerate(context_docs)
        ])

        prompt = f"""Use this context to answer the question:

Context:
{context}

Question: {query}

Answer the question based on the provided context. Include citations."""

        response = self.bedrock.invoke_model(
            modelId='anthropic.claude-3-sonnet-20240229-v1:0',
            body=json.dumps({
                'anthropic_version': 'bedrock-2023-05-31',
                'messages': [{
                    'role': 'user',
                    'content': prompt
                }],
                'max_tokens': 1000
            })
        )

        answer = json.loads(response['body'].read())['content'][0]['text']

        return answer

    def query(self, question):
        """
        Complete RAG query
        """
        # Retrieve
        docs = self.retrieve(question, top_k=3)

        # Generate
        answer = self.generate(question, docs)

        return {
            'answer': answer,
            'sources': [doc['source'] for doc in docs]
        }

# Usage
rag = SimpleRAG()
rag.load_documents('data/company-docs/')
rag.chunk_documents(chunk_size=1000)
rag.create_embeddings()

# Query
result = rag.query("What is the return policy?")
print(f"Answer: {result['answer']}")
print(f"Sources: {result['sources']}")

Example 2: Production RAG with Knowledge Bases

"""
Production RAG using Bedrock Knowledge Bases
"""
import boto3
import json

class ProductionRAG:
    def __init__(self, kb_id, model_arn):
        self.kb_id = kb_id
        self.model_arn = model_arn
        self.client = boto3.client('bedrock-agent-runtime')

    def query(self, question, filters=None, top_k=5):
        """
        Query with optional metadata filtering
        """
        config = {
            'type': 'KNOWLEDGE_BASE',
            'knowledgeBaseConfiguration': {
                'knowledgeBaseId': self.kb_id,
                'modelArn': self.model_arn,
                'retrievalConfiguration': {
                    'vectorSearchConfiguration': {
                        'numberOfResults': top_k
                    }
                }
            }
        }

        # Add filters if provided
        if filters:
            config['knowledgeBaseConfiguration']['retrievalConfiguration']['vectorSearchConfiguration']['filter'] = filters

        response = self.client.retrieve_and_generate(
            input={'text': question},
            retrieveAndGenerateConfiguration=config
        )

        return {
            'answer': response['output']['text'],
            'citations': response.get('citations', []),
            'session_id': response.get('sessionId')
        }

    def multi_turn_conversation(self, session_id, question):
        """
        Continue conversation with context
        """
        response = self.client.retrieve_and_generate(
            input={'text': question},
            retrieveAndGenerateConfiguration={
                'type': 'KNOWLEDGE_BASE',
                'knowledgeBaseConfiguration': {
                    'knowledgeBaseId': self.kb_id,
                    'modelArn': self.model_arn
                }
            },
            sessionId=session_id
        )

        return {
            'answer': response['output']['text'],
            'citations': response.get('citations', [])
        }

# Usage
rag = ProductionRAG(
    kb_id='KB123456',
    model_arn='arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0'
)

# Single query
result = rag.query("What is the return policy?")
print(f"Answer: {result['answer']}")

# With filters
result = rag.query(
    "What are the Q4 results?",
    filters={'equals': {'key': 'year', 'value': '2024'}}
)

# Multi-turn conversation
session_id = result.get('session_id')
followup = rag.multi_turn_conversation(session_id, "What about Q3?")

Troubleshooting

Common Issues and Solutions

Issue 1: Low Retrieval Quality

Problem: Retrieved documents not relevant

Solutions: 1. Adjust chunk size

# Try smaller chunks
chunks = chunk_documents(docs, chunk_size=500)
  1. Use better embeddings
# Use Titan Embeddings V2
embedding_v2 = get_embedding_v2(text, dimensions=1024)  # Instead of 1536
  1. Add query optimization
optimized_query = optimize_query(user_query)
results = retrieve(optimized_query)
  1. Use hybrid search
# Combine vector search + keyword
results = hybrid_search(query, alpha=0.7)  # 70% vector + 30% keyword

Issue 2: Slow Performance

Problem: Queries taking too long

Solutions: 1. Use caching

cache = EmbeddingCache()
embedding = cache.get_embedding(text)
  1. Optimize index
# Create optimized FAISS index
index = create_optimized_index(dimension=1536, num_clusters=100)
  1. Reduce top_k
# Instead of 10
results = retrieve(query, top_k=3)
  1. Use async processing
results = await async_rag_query(query)

Issue 3: High Embedding Costs

Problem: Embedding costs too high

Solutions: 1. Use smaller dimensions

# Use Titan Embeddings V2 with 256 dimensions instead of 1536
embedding = get_embedding_v2(text, dimensions=256)
  1. Cache embeddings
# Don't regenerate for same text
embedding = cache.get_embedding(text)
  1. Batch process
# Instead of one at a time
embeddings = get_embeddings_batch(texts, batch_size=25)
  1. Use cheaper models for reranking
# Use Haiku instead of Sonnet for scoring
score = rerank_with_haiku(query, docs)

Issue 4: Hallucination

Problem: Model making up information

Solutions: 1. Stricter prompts

prompt = """ONLY use information from the provided context.
If the answer is not in the context, say "I don't have that information."

Context: {context}

Question: {query}"""
  1. Check retrieval score
if results[0]['score'] < 0.7:
    return "I don't have enough relevant information"
  1. Add confidence scoring
confidence = calculate_confidence(results)
if confidence < 0.8:
    return "Low confidence warning: ..."

Issue 5: Token Limit Exceeded

Problem: Context too large for model

Solutions: 1. Reduce number of retrieved docs

# Instead of 10
results = retrieve(query, top_k=2)
  1. Truncate long documents
def truncate_doc(doc, max_tokens=500):
    words = doc['text'].split()
    return ' '.join(words[:max_tokens])
  1. Use summarization
# Summarize context before generation
summary = summarize_context(retrieved_docs)
answer = generate(query, summary)
  1. Use models with larger context
# Claude 3 Sonnet supports 200K tokens
# Use for larger context needs

Summary

RAG (Retrieval-Augmented Generation) enhances LLMs by providing them with relevant external knowledge, enabling: - Access to current and private data - Reduced hallucinations - Source attribution - Cost-effective knowledge updates

AWS Bedrock offers two main approaches: 1. Knowledge Bases - Fully managed, quick setup, integrated with Agents 2. Custom RAG - Full control, advanced features, custom vector database

Key Components: - Document processing and chunking - Embedding generation (Titan, Cohere) - Vector storage (OpenSearch, Pinecone, FAISS) - Retrieval with reranking - Response generation with citations

Best Practices: - Choose appropriate chunk size (500-1000 tokens) - Store rich metadata for filtering - Optimize queries before retrieval - Implement quality checks - Monitor performance metrics

Optimization: - Cache embeddings and results - Batch process multiple queries - Use async for concurrency - Optimize vector indexes

For advanced use cases, move to custom RAG with full control over chunking, retrieval, and generation strategies. Start with Knowledge Bases for quick prototyping, then optimize as needed.